/**
 * FileName: StreamingJoin
 * Author:   SAMSUNG-PC 孙中军
 * Date:     2019/3/18 20:13
 * Description: 联合外部数据
 */
package cn.com.bonc.app;

import cn.com.bonc.conf.ConfigurationManager;
import cn.com.bonc.constant.Constants;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.OutputMode;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryException;

import static org.apache.spark.sql.functions.expr;

public class StreamingJoin {
    public static void main(String[] args) {

        SparkSession sparkSession = SparkSession
                .builder()
                .appName("KafkaJoinRedisApp")
                .config("spark.redis.host", ConfigurationManager.getProperty(Constants.REDIS_IP))
                .config("spark.redis.port", ConfigurationManager.getProperty(Constants.REDIS_PORT))
                .getOrCreate();

        Dataset<Row> kafkaDataset = sparkSession.readStream()
                .format("kafka")
                .option("kafka.bootstrap.servers", ConfigurationManager.getProperty(Constants.KAFKA_BOOTSTRAP_SERVERS))
                .option("subscribe", ConfigurationManager.getProperty(Constants.KAFKA_TOPICS))
                .option("startingOffsets", ConfigurationManager.getProperty(Constants.KAFKA_AUTO_OFFSET_RESET))
                .load();

        Dataset<Row> redisDataset = sparkSession.read()
                .format("org.apache.spark.sql.redis")
                .option("table", "Xdata")
                .load();

        /**
         * 方式一：创建临时视图，sql语句连接
         */
        kafkaDataset.createOrReplaceTempView("kafka");
        redisDataset.registerTempTable("redis");
        sparkSession.sql("select * from kafka where phone in(select key from redis)");

        /**
         * 方式二：调用方法 join
         */
        StreamingQuery query = kafkaDataset.join(redisDataset, expr("phone = key"))
                .selectExpr("name,phone")
                .writeStream()
                .outputMode(OutputMode.Append())
                .format("console")
                .outputMode(OutputMode.Complete())
                .option("truncate", "false")
                .start();
        try {
            query.awaitTermination();
        } catch (StreamingQueryException e) {
            e.printStackTrace();
        }

    }
}
